package com.atguigu.flink.chapter11.window;

import com.atguigu.flink.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author lzc
 * @Date 2022/7/12 11:12
 */
public class Flink02_TVF_2 {
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<WaterSensor> stream = env
            .fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_2", 5000L, 50),
                new WaterSensor("sensor_1", 6001L, 60)
            )
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    .withTimestampAssigner((ws, ts) -> ws.getTs())
            );
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        
        Table table = tEnv.fromDataStream(stream, $("id"), $("ts"), $("vc"), $("et").rowtime());
        tEnv.createTemporaryView("sensor", table);
        
        
        /*tEnv
            .sqlQuery("select " +
                          " id, " +
                          " window_start, " +
                          " window_end, " +
                          " sum(vc) vc_sum " +
                          "from table( tumble( table sensor, descriptor(et), interval '5' second )) " +
                          "group by id, window_start, window_end " +
                
                          "union " +
                
                          "select " +
                          " '' as id, " +
                          " window_start, " +
                          " window_end, " +
                          " sum(vc) vc_sum " +
                          "from table( tumble( table sensor, descriptor(et), interval '5' second )) " +
                          "group by window_start, window_end " )
            .execute()
            .print();*/
    
        tEnv
            .sqlQuery("select " +
                          " id, " +
                          " window_start, " +
                          " window_end, " +
                          " sum(vc) vc_sum " +
                          "from table( tumble( table sensor, descriptor(et), interval '5' second )) " +
//                          "group by window_start, window_end, grouping sets( (id), () ) ")
//                          "group by window_start, window_end, rollup(id) ")
                          "group by window_start, window_end, cube(id) ")
            .execute()
            .print();
    
        // group by window_start, window_end,  a, b, c, d
        // group by window_start, window_end,  a, b, c
        // group by window_start, window_end,  a, b
        // group by window_start, window_end,  a
        // group by window_start, window_end
        
        // group by window_start, window_end, grouping sets( (a,b,c,d), (a,b,c), (a,b),(a), ())
        // 等价于  ROLLUP(a,b,c,d)
        
        // group by window_start, window_end,  a, b, c
        // group by window_start, window_end,  a, b
        // group by window_start, window_end,  a, c
        // group by window_start, window_end,  b, c
        // group by window_start, window_end,  a
        // group by window_start, window_end,  b
        // group by window_start, window_end,  c
        // group by window_start, window_end
        // 等价于: cube(a,b,c)
    
    }
}
